{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Dataframe Concepts_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _by Jeff Levy (jlevy@urban.org) & Alex Engler (aengler@urban.org)_\n",
    "\n",
    "_Last Updated: 31 Jul, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will explore some basic concepts necessary for working with many dataframe operations, in particular `groupBy` and `persist`._\n",
    "\n",
    "_Main operations used: read.load, withColumn, groupBy, persist, cache, unpersist_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Spark does its computations in what is called a _lazy_ fashion.  That is, when you tell it to do things to your data, it _doesn't do them right away._  Instead it checks that they're valid commands, then stacks them up until you actually ask it to return a value or a dataframe to you.  This is stack of commands is called a _lineage_ in Spark, and means we can think of Spark dataframe objects as a list of instructions built on top of your original data.\n",
    "\n",
    "Let's see it in action.  First we'll load up the same dataframe we did in basics 1:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.format('com.databricks.spark.csv').options(header='False', inferschema='true', sep='|').load('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll take a subset of the columns and rename them, like we did in the first tutorial: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_lim = df.select('_c0','_c1','_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13')\n",
    "\n",
    "old_names = ['_c0','_c1','_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13']\n",
    "new_names = ['loan_id','period','servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', 'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']\n",
    "for old, new in zip(old_names, new_names):\n",
    "    df_lim = df_lim.withColumnRenamed(old, new)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But now let's try some numerical operations on a column. We can use the .withColumn method to create a new dataframe that also had an additional calculated variable, in this case the difference between loan_age and months remaining."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "## Add a column named 'loan_length' to the existing dataframe:\n",
    "df_lim = df_lim.withColumn('loan_length', df_lim['loan_age'] + df_lim['mths_remng'])\n",
    "\n",
    "## Group the new dataframe by servicer name:\n",
    "df_grp = df_lim.groupBy('servicer_name')\n",
    "\n",
    "## Compute average loan age, months remaining, and loan length by servicer:\n",
    "df_avg = df_grp.avg('loan_age', 'mths_remng', 'loan_length')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we performed a simple math operation (adding `loan_age` to `mnths_remng`) then perform a `groupBy` operation over the entries in `servicer_name` (more on groupBy in a minute) while asking it to calculate averages for three numeric columns across each servicer.  \n",
    "\n",
    "However, if you actually ran the code, you probably noticed that the the code block finished nearly instantly - despite there being over 3.5 million rows of data.  This is an example of _lazy_ computing - **nothing was actually computed here. ** At the moment, we're just creating a list of instructions. All pySpark did was make sure they were valid instructions.  Now let's see what happens if we tell it to `show` us the results:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+------------------+------------------+\n",
      "|       servicer_name|       avg(loan_age)|   avg(mths_remng)|  avg(loan_length)|\n",
      "+--------------------+--------------------+------------------+------------------+\n",
      "|  QUICKEN LOANS INC.|-0.08899247348614438| 358.5689787889155|358.47998631542936|\n",
      "|NATIONSTAR MORTGA...| 0.39047125841532887| 359.5821853961678| 359.9726566545831|\n",
      "|                null|  5.6264681794400015|354.21486809483747| 359.8413362742775|\n",
      "|WELLS FARGO BANK,...|  0.6704475572258285|359.25937820293814|359.92982576016396|\n",
      "|FANNIE MAE/SETERU...|   9.333333333333334| 350.6666666666667|             360.0|\n",
      "|DITECH FINANCIAL LLC|   5.147629653197582| 354.7811008590519|359.92873051224944|\n",
      "|SENECA MORTGAGE S...| -0.2048814025438295|360.20075627363354| 359.9958748710897|\n",
      "|SUNTRUST MORTGAGE...|  0.8241234756097561| 359.1453887195122|  359.969512195122|\n",
      "|ROUNDPOINT MORTGA...|   5.153408024034549| 354.8269387244163|359.98034674845087|\n",
      "|      PENNYMAC CORP.| 0.14966740576496673| 359.8470066518847|359.99667405764967|\n",
      "|PHH MORTGAGE CORP...|  0.9780420860018298|359.02195791399816|             360.0|\n",
      "|MATRIX FINANCIAL ...|   6.566794707639778| 353.4229620145113|359.98975672215107|\n",
      "|               OTHER| 0.11480465916297489| 359.8345750772193|359.94937973638224|\n",
      "|  CITIMORTGAGE, INC.|   0.338498789346247|359.41670702179175|  359.755205811138|\n",
      "|PINGORA LOAN SERV...|   7.573573382530696|352.40886824861633|  359.982441631147|\n",
      "|JP MORGAN CHASE B...|  1.6553418987669224| 358.3384495990342|359.99379149780117|\n",
      "|      PNC BANK, N.A.|  1.1707779886148009|358.78747628083494| 359.9582542694497|\n",
      "|FREEDOM MORTGAGE ...|    8.56265812109968|351.29583403609377|359.85849215719344|\n",
      "+--------------------+--------------------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_avg.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That takes a bit longer to run, because when you executed `show` you asked for a dataframe to be returned to you, which meant **Spark went back and caclulated the three previous operations.**  You could have done any number of intermediate steps similar to those before calling `show` and they all would have been lazy operations that finished nearly instantly, until `show` ran them all."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now this would just be a background peculiarity, except that we have some control over the process.  If you imagine your _lineage_ as a straight line of instructions leading from your source data to your ouput, **we can use the `persist()` method to create a point for branching.**  Essentially it tells Spark \"follow the instructions to this point, then _hold these results_ because I'm going to come back to them again.\"\n",
    "\n",
    "Let's redo the previous code block with a `persist()`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_keep = df_lim.withColumn('loan_length', df_lim['loan_age'] + df_lim['mths_remng'])\n",
    "\n",
    "df_keep.persist()\n",
    "\n",
    "df_grp = df_keep.groupBy('servicer_name')\n",
    "df_avg = df_grp.avg('loan_age', 'mths_remng', 'loan_length')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `persist` command adds very little overhead in this case, finishing in in well under a second.  Now we call `show` again to force it to calculate the averages by group:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+------------------+------------------+\n",
      "|       servicer_name|       avg(loan_age)|   avg(mths_remng)|  avg(loan_length)|\n",
      "+--------------------+--------------------+------------------+------------------+\n",
      "|  QUICKEN LOANS INC.|-0.08899247348614438| 358.5689787889155|358.47998631542936|\n",
      "|NATIONSTAR MORTGA...| 0.39047125841532887| 359.5821853961678| 359.9726566545831|\n",
      "|                null|  5.6264681794400015|354.21486809483747| 359.8413362742775|\n",
      "|WELLS FARGO BANK,...|  0.6704475572258285|359.25937820293814|359.92982576016396|\n",
      "|FANNIE MAE/SETERU...|   9.333333333333334| 350.6666666666667|             360.0|\n",
      "|DITECH FINANCIAL LLC|   5.147629653197582| 354.7811008590519|359.92873051224944|\n",
      "|SENECA MORTGAGE S...| -0.2048814025438295|360.20075627363354| 359.9958748710897|\n",
      "|SUNTRUST MORTGAGE...|  0.8241234756097561| 359.1453887195122|  359.969512195122|\n",
      "|ROUNDPOINT MORTGA...|   5.153408024034549| 354.8269387244163|359.98034674845087|\n",
      "|      PENNYMAC CORP.| 0.14966740576496673| 359.8470066518847|359.99667405764967|\n",
      "|PHH MORTGAGE CORP...|  0.9780420860018298|359.02195791399816|             360.0|\n",
      "|MATRIX FINANCIAL ...|   6.566794707639778| 353.4229620145113|359.98975672215107|\n",
      "|               OTHER| 0.11480465916297489| 359.8345750772193|359.94937973638224|\n",
      "|  CITIMORTGAGE, INC.|   0.338498789346247|359.41670702179175|  359.755205811138|\n",
      "|PINGORA LOAN SERV...|   7.573573382530696|352.40886824861633|  359.982441631147|\n",
      "|JP MORGAN CHASE B...|  1.6553418987669224| 358.3384495990342|359.99379149780117|\n",
      "|      PNC BANK, N.A.|  1.1707779886148009|358.78747628083494| 359.9582542694497|\n",
      "|FREEDOM MORTGAGE ...|    8.56265812109968|351.29583403609377|359.85849215719344|\n",
      "+--------------------+--------------------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_avg.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Showing the groupBy averages this way took a bit longer because of the `persist` overhead.  But now let's back up and, in addition to the mean, lets also get the sums of our groupBy object:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_sum = df_grp.sum('new_int_rt', 'loan_age', 'mths_remng', 'cd_zero_bal', 'loan_length')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That was the *lazy* portion, now we make it execute:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-------------+---------------+----------------+----------------+\n",
      "|       servicer_name|     sum(new_int_rt)|sum(loan_age)|sum(mths_remng)|sum(cd_zero_bal)|sum(loan_length)|\n",
      "+--------------------+--------------------+-------------+---------------+----------------+----------------+\n",
      "|  QUICKEN LOANS INC.|  101801.76500000055|        -2081|        8384777|            null|         8382696|\n",
      "|NATIONSTAR MORTGA...|  40287.497999999934|         3770|        3471766|               2|         3475536|\n",
      "|                null|1.3139130895007337E7|     17690263|     1113692280|           16932|      1131382543|\n",
      "|WELLS FARGO BANK,...|  187326.36499999996|        29436|       15773283|            null|        15802719|\n",
      "|FANNIE MAE/SETERU...|                26.6|           56|           2104|            null|            2160|\n",
      "|DITECH FINANCIAL LLC|   39531.70999999991|        48537|        3345231|              41|         3393768|\n",
      "|SENECA MORTGAGE S...|   24093.55999999997|        -1192|        2095648|            null|         2094456|\n",
      "|SUNTRUST MORTGAGE...|  21530.767999999884|         4325|        1884795|            null|         1889120|\n",
      "|ROUNDPOINT MORTGA...|   67708.25999999994|        82336|        5669070|              74|         5751406|\n",
      "|      PENNYMAC CORP.|  15209.139999999992|          540|        1298328|            null|         1298868|\n",
      "|PHH MORTGAGE CORP...|   9086.066000000006|         2138|         784822|            null|          786960|\n",
      "|MATRIX FINANCIAL ...|   19212.93299999999|        30772|        1656140|              16|         1686912|\n",
      "|               OTHER|    904855.043999986|        25163|       78868902|              21|        78894065|\n",
      "|  CITIMORTGAGE, INC.|  16939.329999999998|         1398|        1484391|            null|         1485789|\n",
      "|PINGORA LOAN SERV...|   64224.70499999985|       119049|        5539515|             111|         5658564|\n",
      "|JP MORGAN CHASE B...|  50187.154999999984|        19197|        4155651|            null|         4174848|\n",
      "|      PNC BANK, N.A.|            6911.725|         1851|         567243|               1|          569094|\n",
      "|FREEDOM MORTGAGE ...|   24800.60499999998|        50768|        2082833|              60|         2133601|\n",
      "+--------------------+--------------------+-------------+---------------+----------------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_sum.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That was dramatically faster than the calculation showing the averages - (we benchmarked it at 1.49 seconds versus over 18 seconds).  This is because Spark kept the intermediate results up to `persist()`, from when we calculated the averages, and thus only had to run the code that came after that.  We can now do as many different branches of operations as we want stemming from `df_new` and since we persisted it, all the code before the `persist()` won't be executed again.\n",
    "\n",
    "There is no need for persisting if there is no branching.  In fact, as we saw, `persist` adds a bit of overhead to the process, and so is actually a hinderance if you're not going to be utilizing the branch point.  As a matter of good practice, and to free up more resources, you can call `.unpersist()` on a persisted object to drop it from storage when done with it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[loan_id: bigint, period: string, servicer_name: string, new_int_rt: double, act_endg_upb: double, loan_age: int, mths_remng: int, aj_mths_remng: int, dt_matr: string, cd_msa: int, delq_sts: string, flag_mod: string, cd_zero_bal: int, dt_zero_bal: string, loan_length: int]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_keep.unpersist();"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(The trailing ; simply gags the output from the command. We don't need to see the summary of what we just unpersisted)\n",
    "\n",
    "Also note that `cache()` is essentially a synonym for `persist()`, except it specifies storing the checkpoint in memory for the fastest recall, while persisting allows Spark to swap some of the checkpoint to disk if necessary.  Obviously `cache()` only works if the dataframe you are forcing it to hold is small enough that it can fit in the memory of each node, so use it with care."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And finally, a bit more on `groupBy`.  Hopefully the usage above has given you some insight into how it works.  In short, `groupBy` is the vehicle for aggregation in a dataframe.  A `groupBy` object is, in itself, incomplete.  So, the line in the code block where we introduced a `persist()` above that looks like this:\n",
    "\n",
    "`df_grp = perf_keep.groupBy('_c2')`\n",
    "\n",
    "which generates a `groupBy` object where the data is grouped around the unique values found in column `C2`, but it is just a foundation.  It is like the sentence _\"We are going to group our data up by the unique values found in column C2, and then...\"_  The next line of code contains the rest:\n",
    "\n",
    "`df_avg = df_grp.avg('_c3', '_c5', '_c6', '_c12', 'New_c12')`\n",
    "\n",
    "Or to finish the sentence, _\"... calculate the averages for these five columns within each group.\"_\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": "text/x-ipython",
   "file_extension": ".py",
   "mimetype": "text/x-ipython",
   "name": "python",
   "pygments_lexer": "python",
   "version": "2.7.12\n"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
